package com.netty;

import com.netty.coder.WebMessageDecoder;
import com.netty.coder.WebMessageEncoder;
import com.netty.constant.Constants;
import com.netty.handler.ImWebScoketServerHandler;
import com.netty.handler.LoginAuthReqHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
import io.netty.handler.traffic.TrafficCounter;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

public class ImWebsocketServer {
    private Logger logger = LoggerFactory.getLogger(getClass());

    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;

    private CIMWebSocketRequestHandler cimRequestHandler;
    private ChannelGroup channelGroup = null;
    public static String wsPath = "/ws";

    private int port;

    public ImWebsocketServer(CIMWebSocketRequestHandler cimRequestHandler, int port) {
        this.port = port;
        this.cimRequestHandler = cimRequestHandler;
    }
    public ImWebsocketServer(CIMWebSocketRequestHandler cimRequestHandler,int port, ChannelGroup channelGroup) {
        this.port = port;
        this.cimRequestHandler = cimRequestHandler;
        this.channelGroup = channelGroup;
    }

    /*static {
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    TrafficCounter trafficCounter = Constants.TRAFFICHANDLER.trafficCounter();
                    try {
                        TimeUnit.SECONDS.sleep(5);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    final long totalRead = trafficCounter.cumulativeReadBytes();
                    final long totalWrite = trafficCounter.cumulativeWrittenBytes();
                    System.out.println("total read: " + (totalRead >> 10) + " KB");
                    System.out.println("total write: " + (totalWrite >> 10) + " KB");
                    System.out.println("流量监控: " + System.lineSeparator() + trafficCounter);
                }
            }
        }).start();
    }*/


    /**
     * 启动netty websocket
     *
     * @throws InterruptedException
     */
    public void startServer() throws InterruptedException {
        logger.info(">>>>>>>>>>>>>>>>>>>>>>start websocket ...<<<<<<<<<<<<<<<<<<<<<<");
        createEventGroup();
        // Server 服务启动
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workerGroup);
        bootstrap.channel(NioServerSocketChannel.class);
        bootstrap.option(ChannelOption.SO_REUSEADDR, true);//多个端口号重用同一地址
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast(new HttpServerCodec());
                pipeline.addLast(new HttpObjectAggregator(Constants.ImserverConfig.MAX_AGGREGATED_CONTENT_LENGTH));
                // WebSocket数据压缩
                pipeline.addLast(new WebSocketServerCompressionHandler());
                //日志输出
                //pipeline.addLast(new LoggingHandler());
                //流量监控
                //pipeline.addLast(Constants.TRAFFICHANDLER);
                // 主要用于处理大数据流，比如一个1G大小的文件如果你直接传输肯定会撑暴jvm内存的; 增加之后就不用考虑这个问题了
                pipeline.addLast(new ChunkedWriteHandler());
                pipeline.addLast("decoder", new WebMessageDecoder());
                pipeline.addLast("encoder", new WebMessageEncoder());
                pipeline.addLast("loginAuth",new LoginAuthReqHandler(cimRequestHandler));
                pipeline.addLast(new IdleStateHandler(Constants.ImserverConfig.READ_IDLE_TIME, Constants.ImserverConfig.WRITE_IDLE_TIME, 0, TimeUnit.SECONDS));
                pipeline.addLast(new WebSocketServerProtocolHandler(wsPath, null, true, Constants.ImserverConfig.MAX_FRAME_LENGTH));
                pipeline.addLast("handler", new ImWebScoketServerHandler(cimRequestHandler,channelGroup));
            }
        });

        ChannelFuture channelFuture = bootstrap.bind(port).syncUninterruptibly();
        channelFuture.channel().newSucceededFuture().addListener(future -> {
            String logBanner = "\n\n" +
                    "* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *\n" +
                    "*                                                                                   *\n" +
                    "*                                                                                   *\n" +
                    "*                   Websocket Server started on port {}.                         *\n" +
                    "*                                                                                   *\n" +
                    "*                                                                                   *\n" +
                    "* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *\n";
            logger.info(logBanner, port);
        });
        channelFuture.channel().closeFuture().addListener(future -> this.destroy(bossGroup, workerGroup));
    }

    /**
     * 销毁队列
     */
    public void destroy(EventLoopGroup bossGroup, EventLoopGroup workerGroup) {
        logger.info(">>>>>>>>>>>>>>>>>>>>>>destroy server<<<<<<<<<<<<<<<<<<<<<<");
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
        logger.info(">>>>>>>>>>>>>>>>>>>>>>destroy server complate.<<<<<<<<<<<<<<<<<<<<<<");
    }

    /**
     * 根据系统信息创建对应的队列
     */
    private void createEventGroup() {
        ThreadFactory bossThreadFactory = r -> {
            Thread thread = new Thread(r);
            thread.setName("nio-boss-boss");
            return thread;
        };
        ThreadFactory workerThreadFactory = r -> {
            Thread thread = new Thread(r);
            thread.setName("nio-worker-work");
            return thread;
        };
        if (isLinuxSystem()) {
            bossGroup = new EpollEventLoopGroup(2);
            workerGroup = new EpollEventLoopGroup(4);
        } else {
            bossGroup = new NioEventLoopGroup(2);
            workerGroup = new NioEventLoopGroup(4);
        }
    }

    /**
     * 判断是否是linux
     *
     * @return
     */
    private boolean isLinuxSystem() {
        String osName = System.getProperty("os.name").toLowerCase();
        return false;
    }
}
